package com.atguigu.gmall.realtime.app.func;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.common.GmallConfig;
import com.atguigu.gmall.realtime.utils.DimUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class DimSink  extends RichSinkFunction<JSONObject> {
    private Connection conn;

    @Override
    public void open(Configuration parameters) throws Exception {
        //注册驱动
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
        conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
    }


    @Override
    public void invoke(JSONObject value, Context context) throws Exception {
        //获取目的地表名，进行写入
        String tableName = value.getString("sink_table");

        //获取data
        JSONObject dataJsonObj = value.getJSONObject("data");

        if (dataJsonObj != null && dataJsonObj.size() > 0) {

            //插入phoneix语句
            String upsertSql = genUpsertSql(tableName,dataJsonObj);


            //创建数据库操作对象
            PreparedStatement ps = null;
            try {
                ps = conn.prepareStatement(upsertSql);
                ps.execute();
                //注意：Phoenix需要手动提交事务
                conn.commit();
                System.out.println("执行的SQL语句为:" + upsertSql);
            } catch (SQLException e) {
                e.printStackTrace();
                throw new RuntimeException("向Phoenix中插入数据失败");
            } finally {
                if(ps!=null){
                    ps.close();
                }
            }


        }
        //判断是否做的是updata或者是delete操作
        //判断是否做的是update或者delete操作
        if(value.getString("type").equals("update")||value.getString("type").equals("delete")){
            //清除Redis中缓存的维度数据
            DimUtil.deleteCache(tableName,dataJsonObj.getString("id"));
        }
    }

    private String genUpsertSql(String tableName, JSONObject dataJsonObj) {
        String upsertSql = "upsert into "+GmallConfig.HBASE_SCHEMA+
                "."+tableName+" ("+ StringUtils.join(dataJsonObj.keySet(),",")
                +") values('"+StringUtils.join(dataJsonObj.values(),"','")+"')";

        return upsertSql;
    }
}
